Python example of how to download time-series data
Basic
import datetime as dt
import json
import pandas as pd
from ctrader_open_api import Client, Protobuf, TcpProtocol, EndPoints
import ctrader_open_api.messages.OpenApiMessages_pb2 as OA
import ctrader_open_api.messages.OpenApiModelMessages_pb2 as OAModel
import ctrader_open_api.messages.OpenApiCommonMessages_pb2 as OACommon
import ctrader_open_api.messages.OpenApiCommonModelMessages_pb2 as OAModelCommon
from twisted.internet import reactor
PROTO_OA_ERROR_RES_PAYLOAD_TYPE = OA.ProtoOAErrorRes().payloadType
# connection, authentication and program lifecycle
credentials = json.load(open('credentials.json'))
client = Client(EndPoints.PROTOBUF_DEMO_HOST, EndPoints.PROTOBUF_PORT, TcpProtocol)
def onAccAuth(message):
if message.payloadType == PROTO_OA_ERROR_RES_PAYLOAD_TYPE:
print('account authentication failed', '\n')
print(Protobuf.extract(message), '\n')
return
print('account authenticated')
main()
def onAppAuth(message):
if message.payloadType == PROTO_OA_ERROR_RES_PAYLOAD_TYPE:
print('app authentication failed', '\n')
print(Protobuf.extract(message), '\n')
return
print('app authenticated')
req = OA.ProtoOAAccountAuthReq()
req.ctidTraderAccountId = credentials['accountId']
req.accessToken = credentials['accessToken']
deferred = client.send(req)
deferred.addCallbacks(onAccAuth, onError)
def onError(failure):
print('err: ', repr(failure.value))
def connected(client):
print('connected')
req = OA.ProtoOAApplicationAuthReq()
req.clientId = credentials['clientId']
req.clientSecret = credentials['clientSecret']
deferred = client.send(req, responseTimeoutInSeconds=20) # err if no response under 20 secs
deferred.addCallbacks(onAppAuth, onError)
def disconnected(client, reason):
print('disconnected: ', reason)
def onMsg(client, message):
ignores = [i.payloadType for i in [OACommon.ProtoHeartbeatEvent(), OA.ProtoOAAccountAuthRes(), OA.ProtoOAApplicationAuthRes()]]
if message.payloadType in ignores:
return
print('message received')
# get historical daily bars
def onTrendbar(message):
response = Protobuf.extract(message)
if message.payloadType == OA.ProtoOAErrorRes().payloadType:
print(response)
return
bars = []
for bar in response.trendbar:
timestamp = bar.utcTimestampInMinutes * 60 # seconds
open = (bar.low + bar.deltaOpen) / 100000.0
high = (bar.low + bar.deltaHigh) / 100000.0
low = bar.low / 100000.0
close = (bar.low + bar.deltaClose) / 100000.0
bars.append([timestamp, open, high, low, close, bar.volume])
pd.DataFrame(bars, columns=['timestamp','open','high','low','close','volume']).to_csv('bars.csv', index=False)
bars_df = pd.read_csv('bars.csv', index_col='timestamp')
bars_df.index = pd.to_datetime(bars_df.index, unit='s', utc=True) # optional
def main():
req = OA.ProtoOAGetTrendbarsReq()
req.symbolId = 41 # 'XAUUSD'
req.ctidTraderAccountId = credentials['accountId']
req.period = OAModel.ProtoOATrendbarPeriod.D1
req.fromTimestamp = int(dt.datetime(2025,1,1, tzinfo=dt.UTC).timestamp()) * 1000
req.toTimestamp = int(dt.datetime(2025,1,23, tzinfo=dt.UTC).timestamp()) * 1000
deferred = client.send(req)
deferred.addCallbacks(onTrendbar, onError)
client.setConnectedCallback(connected)
client.setDisconnectedCallback(disconnected)
client.setMessageReceivedCallback(onMsg)
client.startService()
reactor.run()
Beginner-level multi-requesting
# get full timeseries data
import datetime as dt
import json
import os
from timeit import default_timer as timer
import pandas as pd
CHUNK_SIZE = {'days': 1} # span covered in 1 request
THRESHOLD_OF_EMPTY_RESPONSES = 60 # cutoff logic
outfile = 'bars.raw.csv'
statefile = 'last_fromTimestamp.txt'
cols = ['utcTimestampInMinutes','low','deltaOpen','deltaClose','deltaHigh','volume']
request_timer = 0 # for measuring time taken by requests
empty_resp_counter = 0 # for counting sequantial empty responses
def main():
sym_id = 41 # 'XAUUSD'
start, end = None, None
if os.path.exists(statefile):
with open(statefile) as f:
last_saved_fromTimestamp = f.read()
last_fromDatetime = dt.datetime.fromtimestamp(int(last_saved_fromTimestamp)/1000)
# date range must not be redundant between runs (otherwise creates duplicates in outfile)
start = last_fromDatetime - dt.timedelta(**CHUNK_SIZE)
end = last_fromDatetime
else:
now = dt.datetime.now(tz=dt.UTC)
today_midnight = now.replace(hour=0, minute=0, second=0, microsecond=0)
start = today_midnight - dt.timedelta(**CHUNK_SIZE)
end = today_midnight
fr, to = [int(i.timestamp()*1000) for i in [start, end]]
reqBars(sym_id, fr, to)
print('FROM: TO: SECS_TOOK: BARS:')
def reqBars(sym_id, fr, to):
req = OA.ProtoOAGetTrendbarsReq()
req.symbolId = sym_id
req.ctidTraderAccountId = credentials['accountId']
req.period = OAModel.ProtoOATrendbarPeriod.M1
req.fromTimestamp = fr
req.toTimestamp = to
deferred = client.send(req, responseTimeoutInSeconds=20)
deferred.addCallbacks(onTrendbar, onError, [fr, to])
global request_timer
request_timer = timer()
def onTrendbar(message, begin, end):
response = Protobuf.extract(message)
# print some stuff about the chunk
global request_timer
chunk_info = [
*[dt.datetime.fromtimestamp(i/1000) for i in [begin,end]],
round((timer()-request_timer)),
len(response.trendbar),
]
print('\t\t'.join(map(str,chunk_info)))
if message.payloadType == OA.ProtoOAErrorRes().payloadType:
print('server sent error')
# process chunk response
if len(response.trendbar) > 0:
chunk_bars = [[str(getattr(i,k)) for k in cols] for i in response.trendbar]
chunk_str = '\n'.join([','.join(i) for i in chunk_bars]) + '\n'
with open(outfile, 'a', newline='') as f: f.write(chunk_str)
# update state file
with open(statefile, 'w') as f: f.write(str(begin))
# count up empty server responses (used to detect when reached end of data)
global empty_resp_counter
empty_resp_counter = 0 if len(response.trendbar) > 0 else (empty_resp_counter + 1)
# assume we reached end of data if last n requests had no data
if empty_resp_counter > THRESHOLD_OF_EMPTY_RESPONSES:
df = pd.read_csv(outfile, names=cols)
df2 = pd.DataFrame()
df2['timestamp'] = df['utcTimestampInMinutes'] * 60
df2['open'] = (df['low'] + df['deltaOpen']) / 100_000
df2['high'] = (df['low'] + df['deltaHigh']) / 100_000
df2['low'] = df['low'] / 100_000
df2['close'] = (df['low'] + df['deltaClose']) / 100_000
df2['volume'] = df['volume']
df2.sort_values(by='timestamp', ascending=True, inplace=True)
df2.to_csv('bars.csv', index=False)
print('all done. shutting down...')
reactor.stop()
return
# request next chunk
prev_frm = dt.datetime.fromtimestamp(begin / 1000)
new_frm = prev_frm - dt.timedelta(**CHUNK_SIZE)
fr, to = [int(i.timestamp()*1000) for i in [new_frm, prev_frm]]
reqBars(response.symbolId, fr, to)